package com.rtsapp.server.domain.mysql.sql.impl;

import com.rtsapp.server.domain.mysql.sql.MySQLTransaction;
import com.rtsapp.server.domain.mysql.sql.impl.op.PingTask;
import com.rtsapp.server.domain.mysql.sql.impl.op.PreparedStatementTask;
import com.rtsapp.server.logger.Logger;

import java.util.concurrent.BlockingQueue;

/**
 * 数据库工作线程
 */
public class DatabaseWorker implements Runnable {

    private static final Logger LOGGER =com.rtsapp.server.logger.LoggerFactory.getLogger( DatabaseWorker.class );

    /**
     * 默认的数据库工作一次的周期时间
     * TODO 先设置为50毫秒, 后续根据实际测试结果调整
     */
    private static final long DEFAULT_WORKER_PERIOD_MSEC = 50;
    private static final int MAX_BATCH_SIZE = 100;

    /**
     * 是否使用批处理模式进行处理
     * 目前测试结果: 数据库在远程的时候, 非批量模式性能更高, 请先保持isBatch为false,不要去改动
     */
    private static final boolean  isBatch = false;

    /**
     * 用于批处理模式的当前事务
     */
    private final  MySQLTransaction curTransaction = new MySQLTransaction();

    /**
     * 数据库操作队列
     */
    private final BlockingQueue<SQLOperation> _queue;
    /**
     * 操作的连接
     */
    private final MySQLConnection _connection;

    /**
     * 内部工作线程
     */
    private final Thread _workerThread;

    /**
     * 线程是否关闭的标识
     */
    private volatile boolean _cancelationToken;

    /**
     *
     */
    private final String _connIpPortDatabase;


    public DatabaseWorker(BlockingQueue<SQLOperation> newQueue, MySQLConnection connection, int queueIndex ) {
        _connection = connection;
        _queue = newQueue;
        _cancelationToken = false;
        _connIpPortDatabase = connection.getConnIpPortDatabase();
        _workerThread = new Thread( this, "DatabaseWorker-"+ queueIndex+":" + _connIpPortDatabase);
        _workerThread.start();
    }


    @Override
    public void run() {

        if (_queue == null) {
            LOGGER.error("数据库线程队列为空, 数据库线程启动失败" );
            return;
        }

        if( isBatch ){
            LOGGER.info( "数据库线程采用批处理模式" );
            workBatch();
        }else{
            LOGGER.info("数据库线程采用普通的一个接一个的模式");
            workOneByOne( );
        }

        LOGGER.info("数据库线程退出:" + Thread.currentThread().getName() );
    }

    private void workOneByOne() {
        while (true) {

            try {
                SQLOperation operation  = _queue.take();

                if (_cancelationToken) {
                    break;
                }

                if (operation == null) {
                    continue;
                }

                operation.setConnection(_connection);
                operation.call();

            } catch ( Throwable ex ) {
                LOGGER.error( "数据库线程报错:" , ex );
            }

        }

    }

    private void workBatch() {

        while (true) {
            try {

                // 线程退出
                if (_cancelationToken) {
                    break;
                }

                // 无数据休眠
                int size =  _queue.size();
                if( size < 1 ){
                    Thread.sleep( DEFAULT_WORKER_PERIOD_MSEC );
                    continue;
                }

                for( int i = 0; i < size; i++ ) {

                    SQLOperation operation = _queue.take();

                    if (operation == null) {
                        continue;
                    }

                    if( operation instanceof PreparedStatementTask ){

                        PreparedStatementTask stmtTask = (PreparedStatementTask)operation;
                        curTransaction.append( stmtTask.getStmt() );

                        if( curTransaction.size() >= MAX_BATCH_SIZE ){
                            executeAndClear( _connection, curTransaction );
                        }

                    }else{

                        // 先执行批处理缓存的未执行事务
                        executeAndClear( _connection, curTransaction );

                        // 然后执行现在的不可进行批处理的操作
                        operation.setConnection(_connection);
                        operation.call();
                    }
                }

                //执行批处理缓存的事务
                executeAndClear( _connection, curTransaction);

            }catch ( InterruptedException ex ){
                LOGGER.error( "数据库线程被中断:" , ex );
            } catch ( Throwable ex ) {
                LOGGER.error( "数据库线程报错:" , ex );
            }

        }
    }

    /**
     * 执行批处理事务，并将十事务中的操作队列清空
     * @param _connection
     * @param curTransaction
     */
    private void executeAndClear( MySQLConnection _connection,  MySQLTransaction curTransaction) {
        if( curTransaction.size() < 1 ){
            return;
        }

        _connection.executeTransaction( curTransaction );

        curTransaction.cleanup();
    }


    /**
     * 停止数据库工作线程
     * 通过设置关闭信号为true,来进行关闭
     * 必须给阻塞队列加入一个对象，保证数据库线程不会阻塞再_queue.take()处
     */
    public void stop(){
        //先设置为关闭
        _cancelationToken = true;
        _queue.add( new PingTask() );
    }

}